package org.zjvis.datascience.service.dag;

import cn.hutool.core.util.ObjectUtil;
import com.alibaba.fastjson.JSONObject;
import org.apache.commons.lang3.StringUtils;
import org.zjvis.datascience.common.dto.TaskInstanceDTO;
import org.zjvis.datascience.common.util.db.JDBCUtil;
import org.zjvis.datascience.service.dataprovider.GPDataProvider;

import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import static org.zjvis.datascience.common.constant.DatabaseConstant.DEFAULT_DATASET_ID;


/**
 * @description 常规任务ETL等执行调度器
 * @date 2021-12-10
 */
public class TaskRunner implements Callable<TaskRunnerResult> {
    private String errorTpl = "{\"status\":500, \"error_msg\":\"%s\"}";
    private int maxRetryTimes = 3;
    private int sleepTime = 20000;
    private GPDataProvider provider;
    private TaskInstanceDTO instance;

//    @Deprecated
    public TaskRunner(GPDataProvider provider, TaskInstanceDTO instance) {
        this.provider = provider;
        this.instance = instance;
    }


    @Override
    public TaskRunnerResult call() throws InterruptedException {
        int index = 0;
        TaskRunnerResult result = null;
        if (instance.hasPrecautionaryError()) {
            return new TaskRunnerResult(500, String.format(errorTpl, "error happens when init stage."));
        }
        while (index < maxRetryTimes) {
            try {
                result = this.exec();
                if (result.getStatus() == 0) {
                    break;
                }
                ++index;
            } catch (Exception e) {
                ++index;
                Thread.sleep(sleepTime);
                result = new TaskRunnerResult(500, String.format(errorTpl, e.getMessage().replaceAll("\"", "'")));
            }
        }
        return result;
    }

    public TaskRunnerResult exec() {
        String sql = instance.getSqlText();
        // TODO 判断状态
        if (StringUtils.isEmpty(sql)) {
            return new TaskRunnerResult(0, null);
        }
//        ServletContext servletContext = (ServletContext) SpringContextJobUtil.getBean("servletContext");
//        String dataSourceKey = (String) servletContext.getAttribute(Constant.DEFAULT_DATA_SOURCE_KEY);
        Connection conn = null;
        Statement st = null;
        try {
            if (ObjectUtil.isNotNull(provider)) {
                conn = provider.getConn(DEFAULT_DATASET_ID);
                st = conn.createStatement();
            }
        } catch (Exception e) {
            JDBCUtil.close(conn, null, null);
            return new TaskRunnerResult(500, String.format(errorTpl, e.getMessage().replaceAll("\"", "'")));
        }

        if (sql.startsWith("CREATE VIEW") || sql.startsWith("create view") || sql.toLowerCase().startsWith("create table")) {
            try {
                st.execute(sql);
            } catch (Exception e) {
                return new TaskRunnerResult(500, String.format(errorTpl, e.getMessage().replaceAll("\"", "'")));
            }
            finally {
                JDBCUtil.close(conn, st, null);
            }
            return new TaskRunnerResult(0, null);
        }

        AtomicReference<String> output = new AtomicReference<String>();
        AtomicInteger status = new AtomicInteger(-1);
        try {

//                RpcResult<String> stringRpcResult = service.executeSlowSQL(dataSourceKey, sql, String.class);
            ResultSet rs = st.executeQuery(sql);
            while (rs.next()) {
                output.set(rs.getString(1));
                break;
            }
            if (sql.contains("sys_func_pivot_table_view_create")) {
                if ("成功".equals(output.get())) {
                    status.set(0);
                } else {
                    status.set(500);
                }
            } else {
                JSONObject jsonObject = JSONObject.parseObject(output.get());
                status.set(jsonObject.getInteger("status"));
            }


        } catch (Exception e) {
            return new TaskRunnerResult(500, String.format(errorTpl, e.getMessage().replaceAll("\"", "'")));
        }
        finally {
            JDBCUtil.close(conn, st, null);
        }

        return new TaskRunnerResult(status.get(), output.get());
    }

}
